package com.jie.flink.cdc.service;

import com.jie.flink.cdc.flinksource.MysqlDeserialization;
import com.jie.flink.cdc.flinksource.config.MysqlSourceConfigProperties;
import com.jie.flink.cdc.flinksource.config.SourceConfigWashMapper;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author zhanggj
 * @date 2023/5/25 18:33
 * @desc
 */
public class MysqlFlinkCdcService extends FlinkCdcService{
    
    private final MysqlSourceConfigProperties sourceConfigProperties;

    public MysqlFlinkCdcService(final MysqlSourceConfigProperties sourceConfigProperties) {
        this.sourceConfigProperties = sourceConfigProperties;
    }


    @Override
    protected DataStreamSource<String> buildFlinkStreamSource(final StreamExecutionEnvironment env) {
        return env.fromSource(buildMysqlDataSource(), WatermarkStrategy.noWatermarks(), "mysql-source")
                .setParallelism(4);
    }

    public MySqlSource<String> buildMysqlDataSource() {

        StartupOptions startupOptions = StartupOptions.initial();
        if (sourceConfigProperties.getInitReadIgnore()) {
            startupOptions = StartupOptions.latest();
        }

        return MySqlSource.<String>builder()
                .hostname(sourceConfigProperties.getHostName())
                .port(sourceConfigProperties.getPort())
                .username(sourceConfigProperties.getUserName())
                .password(sourceConfigProperties.getPassword())
                .databaseList(sourceConfigProperties.getDatabase())
                //多个表逗号分隔
                .tableList(SourceConfigWashMapper.tableNameWash(sourceConfigProperties.getTableArray()))
                //自定义返回结果集
                .deserializer(new MysqlDeserialization())
                //.serverTimeZone("UTC")
                .serverTimeZone("GMT+8")
                //.debeziumProperties(properties)
                //   StartupOptions.initial() 先全量后增量
                //   StartupOptions.latest() 从最新binlog读取，增量方式
                .startupOptions(startupOptions)
                // 1、伪装mysql slave：mysql配置的 server-id在mysql集群中要全局唯一
                // 2、在env添加数据源的并行度（setParallelism）时，如果并行度>1，需要配置范围
                // 例如并行度为4，要配置 311-313
                .serverId("311-313")
                .build();
    }
}
